package com.denghq.projectbuilder.component.msgbus.bus.impl;

import com.denghq.projectbuilder.common.util.UUIDUtils;
import com.denghq.projectbuilder.component.msgbus.bus.IMsgProcessor;
import com.denghq.projectbuilder.component.msgbus.receiver.impl.RabbitMqMsgReceiver;
import com.denghq.projectbuilder.component.msgbus.sender.impl.RabbitMqMsgSender;
import com.denghq.projectbuilder.component.msgbus.sender.IMsg;
import com.denghq.projectbuilder.component.msgbus.sender.impl.SimpleMsg;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.SmartLifecycle;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 *  RabbitMqMsgBus
 *  消息总线rabbitmq实现
 */
public class RabbitMqMsgBus extends AbstractMsgBus implements SmartLifecycle {

    @Getter
    @Setter
    private RabbitMqMsgSender sender;

    @Getter
    @Setter
    private RabbitMqMsgReceiver receiver;

    private final AtomicBoolean running;

    public RabbitMqMsgBus(RabbitMqMsgSender sender, RabbitMqMsgReceiver receiver) {
        this.sender = sender;
        this.receiver = receiver;
        this.running = new AtomicBoolean(false);
    }

    @Override
    public void subscribe(String topic, IMsgProcessor msgProcesser) {
        synchronized (receiver.getTopicObserverMap()) {
            List<IMsgProcessor> processerList = receiver.getTopicObserverMap().get(topic);
            if (processerList == null) {
                processerList = Lists.newArrayList();
                receiver.getTopicObserverMap().put(topic, processerList);
            }
            processerList.add(msgProcesser);
        }
    }

    @Override
    public void publish(String topic, String message) {
        sender.sendMsg(topic, new SimpleMsg(message, UUIDUtils.getUUID()));
    }

    @Override
    public void publish(String topic, IMsg message) {
        sender.sendMsg(topic, message);
    }


    public void init() {
        receiver.startWork();
    }


    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.init();
        }
    }

    @Override
    public void stop(Runnable callback) {
        try {
            this.stop();
        } catch (Exception e) {
        }
        callback.run();
    }

    @Override
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            receiver.stopWork();
        }
    }

    @Override
    public boolean isRunning() {
        return this.running.get();
    }

    @Override
    public int getPhase() {
        return 0;
    }
}
